
#!/usr/bin/env python3
""" 
Builds a user-history revisions-level dataset. Requires a functional spark installation.

"""

#copy of
#/com/users/nathante/mediawiki_dump_tools/wikiq_users/wikiq_users_spark.py
#customized to new wikiq version (with regex match)

## customized to deal with parquet


import sys
# add pyspark to your python path e.g.
#sys.path.append("/home/nathante/sparkstuff/spark/python/pyspark")
#sys.path.append("/home/nathante/sparkstuff/spark/python/")
from pyspark import SparkConf
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql import Window
import pyspark.sql.functions as f
from pyspark.sql import types
from pyspark.sql.types import IntegerType, StringType
import argparse
import glob
from os import mkdir
from os import path
#from wikiq_util import PERSISTENCE_RADIUS
#read a table

def parse_args():

	parser = argparse.ArgumentParser(description='Create a dataset of edits by user.')
	parser.add_argument('-i', '--input_dir', help='Path to wiki edits.', required=True, type=str)
	parser.add_argument('-o', '--output_dir', help='Output directory', default='./output', type=str)
	#not implemented
	#parser.add_argument('--urlencode', help="whether we need to decode urls",action="store_true")
	parser.add_argument('--output_format', help = "[csv, parquet] format to output",type=str)
	parser.add_argument('--num_partitions', help = "number of partitions to output",type=int, default=100)
	args = parser.parse_args()
	return(args)

if __name__ == "__main__":
    args = parse_args()
    conf = SparkConf().setAppName("Wiki Users Spark")
    spark = SparkSession.builder.getOrCreate()
    inputDir = args.input_dir

    reader = spark.read
    #df = reader.parquet(inputDir) #input is parquet
    df = reader.csv(inputDir, sep='\t', inferSchema=True, header=True, mode="PERMISSIVE") #input is tsv


    # replace na editor ids
    df = df.withColumn("editor_id", df["editor_id"].cast(StringType())) #set this before the coalesce so we don't get floats.
    df = df.select('*',f.coalesce(df['editor_id'],df['editor']).alias('editor_id_or_ip'))

    # sort by datetime 
    df = df.orderBy(df.date_time.asc())

    # create our window_specs
    ed_win = Window.orderBy('date_time').partitionBy('editor_id_or_ip')
    art_win = Window.orderBy("date_time").partitionBy("articleid")

    # assign which edit reverted what edit
    reverteds_df = df.filter(~ df.reverteds.isNull()).select(['revid','reverteds','editor_id_or_ip','date_time'])
    reverteds_df = reverteds_df.select("*", f.split(reverteds_df.reverteds,',').alias("reverteds_new"))
    reverteds_df = reverteds_df.drop("reverteds")
    reverteds_df = reverteds_df.withColumnRenamed("reverteds_new", "reverteds")
    reverteds_df = reverteds_df.withColumn("editor_nth_revert_action", f.rank().over(ed_win))

    reverteds_df_explode = reverteds_df.select(reverteds_df.revid.alias('reverted_by'), f.explode(reverteds_df.reverteds).alias('reverted_id'))
    
    df = df.join(reverteds_df_explode, df.revid == reverteds_df_explode.reverted_id, how='left_outer')
    df = df.drop("reverted_id")    
    del(reverteds_df_explode)

    reverteds_df = reverteds_df.select("revid","editor_nth_revert_action")
    df = df.join(reverteds_df, on= ["revid"], how='left_outer')

    del(reverteds_df)

    # count reverts
    reverts_df = df.filter(df.revert==True).select('revid','articleid','editor_id_or_ip','date_time','revert')
    reverts_df = reverts_df.withColumn('editor_nth_revert',f.rank().over(ed_win))

    # articles total reverts
    reverts_df = reverts_df.withColumn('article_nth_revert',f.rank().over(art_win))

    # some kind of bad work around a bug
    # see https://issues.apache.org/jira/browse/SPARK-14948
    reverts_df = reverts_df.select(reverts_df.revid.alias("r_revid"),'editor_nth_revert','article_nth_revert')
    df = df.join(reverts_df, df.revid == reverts_df.r_revid, how='left_outer')
    df = df.drop("r_revid")
    del(reverts_df)

    # count edits
    df = df.withColumn('year', f.year(df.date_time))
    df = df.withColumn('month',f.month(df.date_time))


    if not 'collapsed revs' in df.columns: #must not have run wikiq with edit collapsing.
        df = df.withColumn('editor_nth_edit_nocollapse', f.rank().over(ed_win))
        df = df.withColumn('article_nth_edit_nocollapse', f.rank().over(art_win))
    else:
        df = df.withColumn('editor_nth_edit', f.sum("collapsed_revs").over(ed_win))
        df = df.withColumn('article_nth_edit', f.sum("collapsed_revs").over(art_win))
        df = df.withColumn('editor_nth_collapsed_edit', f.rank().over(ed_win))
        df = df.withColumn('article_nth_collapsed_edit', f.rank().over(art_win))

    # total editor's token_revs
    #if read_persistence:
    #    df = df.withColumn("token_revs_upper", df.token_revs + df.tokens_added * (PERSISTENCE_RADIUS - df.tokens_window - 1))
    #    df = df.withColumn('editor_cum_token_revs_lower', f.sum("token_revs").over(ed_win))
    #    df = df.withColumn('editor_cum_token_revs_upper', f.sum("token_revs_upper").over(ed_win))
    #    df = df.withColumn('article_cum_token_revs_lower', f.sum("token_revs").over(art_win))
    #    df = df.withColumn('article_cum_token_revs_upper', f.sum("token_revs_upper").over(art_win))
    #    df = df.withColumn('editor_cum_tokens_added', f.sum("tokens_added").over(ed_win))
    #    df = df.withColumn('article_cum_tokens_removed', f.sum("tokens_removed").over(art_win))
    
    # output
    df.show()

    df = df.repartition(args.num_partitions)


    if not path.exists(args.output_dir):
        mkdir(args.output_dir)
    if args.output_format == "csv" or args.output_format == "tsv":
        df.write.csv(args.output_dir, sep='\t', mode='overwrite',header=True,timestampFormat="yyyy-MM-dd HH:mm:ss")
            # format == "parquet"
    else:
        print("outputting")
        df.write.parquet(args.output_dir, mode='overwrite')

